package com.sisyphus.wordcount

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row

/**
 * Title: 表
 * Description: batch表
 * Author sweetdream
 * Date 2020/12/11
 *
 * 资料: https://www.cnblogs.com/yangshibiao/tag/Flink/
 * 注: 搜索resultTable.toAppendStream[(String, Double)].print("result")可以查到更多资料
 */
object TableBatch {
  def main(args: Array[String]): Unit = {
    // 1. env
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)

    // 2.source
    val input = env.readTextFile(getClass.getResource("/wordcount").getPath)

    // 3. transform
    //先转换成样例类类型
    val data = input
      .flatMap(_.split(" "))
      .map(WC(_, 1))

    // 4. table
    // 创建一张表
    val dataTable: Table = tableEnv.fromDataSet(data)
    dataTable.printSchema()

    // 第一种: 调用table api进行转换
    dataTable
      .groupBy('word)
      .select('word, 'num.count as 'cnt)
      .toDataSet[Row]
      .print()

    // 第二种: sql实现
    // 注册一张临时表
    tableEnv.createTemporaryView("table1", dataTable)
    val res = tableEnv.sqlQuery("select word,count(num) as num from table1 group by word")
      .toDataSet[Row]
      .print()
  }
}
